#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/mman.h>

#define DBG_SUBSYS S_LIBTASK

#include "recovery.h"
#include "utils.h"

typedef enum {
        __S_STATUS_INIT__ = 0,
        __S_STATUS_OK__,
        __S_STATUS_FAIL__,
} recovery_scan_status_t;

// dir or volume hash
static uint32_t chunk_id_key(const void *args)
{
        const chkid_t *id = args;
        return id->id + id->idx;
}

static int chunk_id_cmp(const void *v1, const void *v2)
{
        const recovery_mq_item_t *mq_item = v1;
        return chkid_cmp(&mq_item->chkid, (const chkid_t *)v2);
}


static void __recovery_set_thread_status(recovery_tp_t *self, int status)
{
        for (int i = 0; i < self->thread_num; i++) {
                if (self->segs[i].status == __R_TH_STOPPED__)
                        continue;

                if (self->segs[i].status != status) {
                        self->segs[i].status = status;
                }
        }
}

static int __recovery_tp_result(recovery_tp_t *self, int t, int n)
{
        int ret;

        ret = sy_rwlock_wrlock(&self->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (t == __T_RESULT_CHECK__) {
                self->check += n;
        } else if (t == __T_RESULT_SUCCESS__) {
                self->success += n;
                self->success_total += n;
        } else if (t == __T_RESULT_FAIL) {
                self->fail += n;
        } else {
                YASSERT(0);
        }

        sy_rwlock_unlock(&self->rwlock);

        return 0;
err_ret:
        return ret;
}

static int __recovery_tp_dec(recovery_tp_t *self)
{
        int ret;

        ret = sy_rwlock_wrlock(&self->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

#if 0
        if (self->running_thread > 0) {
                self->running_thread--;
        }
#endif

        sy_rwlock_unlock(&self->rwlock);

        return 0;
err_ret:
        return ret;
}

static int __recovery_tp_is_stopped(recovery_tp_t *self, int *count)
{
        int c = 0;

        if (count)
                *count = 0;

        for (int i = 0; i < self->thread_num; i++) {
                if (self->segs[i].status == __R_TH_STOPPED__)
                        c++;
        }

        if (count)
                *count = c;

        if (c == self->thread_num)
                return TRUE;

        return FALSE;
}

static int __recovery_tp_start(recovery_tp_t *self, void *pool, int num, thread_proc proc)
{
        int ret;

        DINFO("tp %s num %d\n", self->name, num);

        self->thread_num = num;
        self->started = TRUE;

        self->check = 0;
        self->success = 0;
        self->fail = 0;
        self->speed = 0;

        // 对每一个pool，启动若干工作线程，同时pool main thread wait起来
        recovery_seg_t *seg;
        for (int i = 0; i < num; i++) {
                seg = &self->segs[i];
                seg->pool = pool;
                seg->idx = i;
                seg->status = __R_TH_INIT__;

                ret = sy_thread_create(proc, (void *)seg);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);
        }

        return 0;
}

static int __recovery_tp_stop(recovery_tp_t *self)
{
        int retry = 0, count = 0;

        __recovery_set_thread_status(self, __R_TH_STOP__);

        while (1) {
                if (self->is_stopped(self, &count))
                        break;

                if (retry > 1000) {
                        DWARN("stop %u --> %u, retry %u\n", count, self->thread_num, retry);
                        EXIT(EAGAIN);
                }

                retry++;
                sleep(1);
        }

        self->thread_num = 0;
        self->started = FALSE;

        self->check = 0;
        self->success = 0;
        self->fail = 0;
        self->speed = 0;

        DINFO("exit tp %s (%p) count %d\n", self->name, self, count);
        return 0;
}

int recovery_tp_init(recovery_tp_t *self, const char *name)
{
        int ret;

        ret = sy_rwlock_init(&self->rwlock, "recovery_tp.rwlock");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        strcpy(self->name, name);

        self->thread_num = 0;
        self->started = FALSE;

        self->check = 0;
        self->success = 0;
        self->fail = 0;
        self->speed = 0;

        self->start = __recovery_tp_start;
        self->stop = __recovery_tp_stop;
        self->is_stopped = __recovery_tp_is_stopped;

        self->inc = NULL;
        self->dec = __recovery_tp_dec;

        self->result = __recovery_tp_result;

        return 0;
err_ret:
        return ret;
}

static int __scan_mq_push(scan_mq_t *self, void *pool, const chkid_t *chkid, const addtion_t *addtion)
{
        int ret = 0;
        recovery_mq_item_t *mq_item;
        recovery_pool_t *_pool = pool;

        ret = sy_rwlock_wrlock(&self->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // TODO is_volume
        mq_item = hash_table_find(self->chkid_tab, (void *)chkid);
        if (mq_item == NULL) {
                ret = recovery_mq_item_create(&mq_item, pool, chkid, addtion);
                if (unlikely(ret))
                        GOTO(err_lock, ret);

                ret = hash_table_insert(self->chkid_tab, (void *)mq_item, &mq_item->chkid, 0);
                if (unlikely(ret))
                        GOTO(err_free, ret);

                count_list_add_tail(&mq_item->hook, &self->chkid_list);

                DINFO("scanhash push pool %s "CHKID_FORMAT" count %u\n", _pool->name, CHKID_ARG(chkid), self->chkid_list.count);
        }

        sy_rwlock_unlock(&self->rwlock);
        return 0;
err_free:
        yfree((void **)&mq_item);
err_lock:
        sy_rwlock_unlock(&self->rwlock);
err_ret:
        DWARN("check "CHKID_FORMAT" fail, ret %d\n", CHKID_ARG(chkid), ret);
        return ret;
}

static int __scan_mq_pop(scan_mq_t *self, chkid_t *chkid)
{
        int ret;
        struct list_head *pos;
        recovery_mq_item_t *item;

        ret = sy_rwlock_wrlock(&self->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (self->chkid_list.count == 0) {
                ret = ENOENT;
                goto err_lock;
        }

        pos = self->chkid_list.list.next;
        count_list_del_init(pos, &self->chkid_list);

        item = list_entry(pos, recovery_mq_item_t, hook);
        *chkid = item->chkid;

        DINFO("pop vol "CHKID_FORMAT" count %u\n", CHKID_ARG(chkid), self->chkid_list.count);

        sy_rwlock_unlock(&self->rwlock);
        return 0;
err_lock:
        sy_rwlock_unlock(&self->rwlock);
err_ret:
        return ret;
}

static int __scan_mq_write(scan_mq_t *self, const chkid_t *chkid, void *buf, int len)
{
        int ret;
        recovery_mq_item_t *mq_item;

        ret = sy_rwlock_wrlock(&self->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mq_item = hash_table_find(self->chkid_tab, (void *)chkid);
        if (mq_item) {
                ret = mq_item->write(mq_item, buf, len);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                self->need_recovery++;
        } else {
                UNIMPLEMENTED(__DUMP__);
        }

        sy_rwlock_unlock(&self->rwlock);
        return 0;
err_ret:
        return ret;
}

static int __scan_mq_done(scan_mq_t *self, const chkid_t *chkid, int ret)
{
        recovery_mq_item_t *mq_item;

        DINFO("chunk "CHKID_FORMAT" ret:%d\n", CHKID_ARG(chkid), ret);

        ret = sy_rwlock_wrlock(&self->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mq_item = hash_table_find(self->chkid_tab, (void *)chkid);
        if (mq_item) {
                mq_item->flush(mq_item);
                mq_item->scan_status = (ret == 0) ? __S_STATUS_OK__ : __S_STATUS_FAIL__;
        } else {
                // TODO
                UNIMPLEMENTED(__DUMP__);
        }

        sy_rwlock_unlock(&self->rwlock);
        return 0;
err_ret:
        return ret;
}

static int __scan_mq_get_addtion(scan_mq_t *self, const chkid_t *chkid, addtion_t *addtion)
{
        int ret;
        recovery_mq_item_t *mq_item;

        ret = sy_rwlock_rdlock(&self->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        mq_item = hash_table_find(self->chkid_tab, (void *)chkid);
        YASSERT(mq_item != NULL);

        *addtion = mq_item->addtion;

        sy_rwlock_unlock(&self->rwlock);
        return 0;
err_ret:
        return ret;
}

static int __scan_mq_get_length(scan_mq_t *self)
{
        return self->chkid_list.count;
}

static int __check_scan_status(void *ent, void *arg)
{
        int *count = ent;
        recovery_mq_item_t *item = arg;

        if (item->scan_status == __S_STATUS_INIT__)
                (*count)++;

        return 0;
}

static int __scan_mq_is_ready(scan_mq_t *self)
{
        int ret, count = 0;

        ret = hash_iterate_table_entries(self->chkid_tab, __check_scan_status, &count);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        return (count == 0) ? TRUE : FALSE;
}

static int __scan_mq_free_item_nolock(scan_mq_t *self, const chkid_t *chkid, int flag) {
        int ret;
        recovery_mq_item_t *item;

        if (flag) {
                DWARN("chunk "CHKID_FORMAT" is removed\n", CHKID_ARG(chkid));
                struct list_head *pos, *n;
                list_for_each_safe(pos, n, &self->chkid_list.list) {
                        item = list_entry(pos, recovery_mq_item_t, hook);
                        if (chkid_cmp(&item->chkid, chkid) == 0) {
                                count_list_del_init(pos, &self->chkid_list);
                                break;
                        }
                }
        }

        DINFO("scanhash chunk "CHKID_FORMAT"\n", CHKID_ARG(chkid));

        ret = hash_table_remove(self->chkid_tab, chkid, (void **)&item);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        if (item)
                item->destroy(item);

        return 0;
}

static int __scan_mq_free_item(scan_mq_t *self, const chkid_t *chkid, int flag) {
        int ret;

        ret = sy_rwlock_wrlock(&self->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __scan_mq_free_item_nolock(self, chkid, flag);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        sy_rwlock_unlock(&self->rwlock);
        return 0;
err_lock:
        sy_rwlock_unlock(&self->rwlock);
err_ret:
        return ret;
}

static int __check_all_item(void *arg, void *ent) {

        vec_chkid_t *v = arg;
        recovery_mq_item_t *item = ent;
        chkid_t *chkid = &item->chkid;

        vec_push(v, *chkid);

        return 0;
}

static int __check_deleted_volume(void *arg, void *ent)
{
        int ret, deleted = 0;
        vec_chkid_t *v = arg;
        recovery_mq_item_t *item = ent;
        chkid_t *chkid = &item->chkid;

        if (is_volume(chkid)) {
                ret = vol_is_deleting(chkid, &deleted);
                if (likely(ret == 0) && deleted) {
                        DINFO("chunk "CHKID_FORMAT" is deleted.\n", CHKID_ARG(chkid));
                        vec_push(v, *chkid);
                }
        }

        return 0;
}

static int __scan_mq_check_deleted_volume(scan_mq_t *self)
{
        int ret, i;
        vec_chkid_t v;
        chkid_t chkid;

        vec_init(&v);

        ret = hash_iterate_table_entries(self->chkid_tab, __check_deleted_volume, &v);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        vec_foreach(&v, chkid, i) {
                ret = __scan_mq_free_item(self, &chkid, 1);
                if (unlikely(ret)) {
                        DWARN("ret %d\n", ret);
                }
        }

        vec_deinit(&v);
        return 0;
err_ret:
        vec_deinit(&v);
        return ret;
}

static int __scan_mq_vol_iterator(scan_mq_t *self, int (*func)(void *, void *), void *ctx)
{
        int ret;

        DINFO("count %d\n", self->chkid_tab->num_of_entries);

        ret = hash_iterate_table_entries(self->chkid_tab, func, ctx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __recovery_check_change_volume(void *arg, void *ent)
{
        int ret, deleted = 0;
        recovery_mq_item_t *item = ent;
        recovery_pool_t *pool = item->pool;
        chkid_t *chkid = &item->chkid;

        (void) arg;

        ret = vol_is_deleting(chkid, &deleted);
        if (ret == 0) {
                if (deleted) {
                        ret = EAGAIN;
                        DWARN("pool %s chunk "CHKID_FORMAT" is deleted, ret %d\n",
                              pool->name, CHKID_ARG(chkid), ret);
                        GOTO(err_ret, ret);
                }
        } else if (unlikely(ret)) {
                DWARN("pool %s chunk "CHKID_FORMAT" ret %d\n",
                      pool->name, CHKID_ARG(chkid), ret);
                if (ret == EREMCHG) {
                        ret = EAGAIN;
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

static int __scan_mq_check_change_volume(scan_mq_t *self)
{
        int ret;

        ret = hash_iterate_table_entries(self->chkid_tab, __recovery_check_change_volume, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __scan_mq_deinit(scan_mq_t *self)
{
        int ret, i;
        chkid_t chkid;
        vec_chkid_t v;

        DINFO("hash %d count %d\n", self->chkid_tab->num_of_entries, self->chkid_list.count);

        ret = sy_rwlock_wrlock(&self->rwlock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        struct list_head *pos, *n;
        list_for_each_safe(pos, n, &self->chkid_list.list) {
                count_list_del_init(pos, &self->chkid_list);
        }

        vec_init(&v);

        // DONOT destroy hashtable
        hash_iterate_table_entries(self->chkid_tab, __check_all_item, &v);
        vec_foreach(&v, chkid, i) {
                __scan_mq_free_item_nolock(self, &chkid, 0);
        }

        vec_deinit(&v);

        self->offline = 0;
        self->lost = 0;
        self->need_recovery = 0;

        YASSERT(self->chkid_list.count == 0);
        YASSERT(self->chkid_tab->num_of_entries == 0);

        sy_rwlock_unlock(&self->rwlock);
        return 0;
}

static int __scan_mq_destroy(scan_mq_t *self)
{
        int ret;

        ret = sy_rwlock_wrlock(&self->rwlock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        struct list_head *pos, *n;
        list_for_each_safe(pos, n, &self->chkid_list.list) {
                count_list_del_init(pos, &self->chkid_list);
        }

        if (self->chkid_tab) {
                hash_destroy_table(self->chkid_tab, NULL);
                self->chkid_tab = NULL;
        }

        self->offline = 0;
        self->lost = 0;
        self->need_recovery = 0;

        YASSERT(self->chkid_list.count == 0);
        YASSERT(self->chkid_tab == NULL);

        sy_rwlock_unlock(&self->rwlock);
        return 0;
}

int scan_mq_init(scan_mq_t *self, const char *name)
{
        int ret;
        char path[MAX_NAME_LEN];

        ret = sy_rwlock_init(&self->rwlock, "scan_mq.rwlock");
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        memset(path, 0, MAX_NAME_LEN);
        sprintf(path, "%s_vol", name);
        self->chkid_tab = hash_create_table(chunk_id_cmp, chunk_id_key, path);
        if (self->chkid_tab == NULL) {
                ret = ENOMEM;
                GOTO(err_ret, ret);
        }

        count_list_init(&self->chkid_list);

        self->offline = 0;
        self->lost = 0;
        self->need_recovery = 0;

        self->push = __scan_mq_push;
        self->pop = __scan_mq_pop;

        self->get_addtion = __scan_mq_get_addtion;
        self->get_length = __scan_mq_get_length;
        self->is_ready = __scan_mq_is_ready;
        self->check_deleted_volume = __scan_mq_check_deleted_volume;
        self->check_change_volume = __scan_mq_check_change_volume;
        self->vol_iterator = __scan_mq_vol_iterator;//(scan_mq_t *self, int (*func)(void *, void *), void *ctx);


        self->write = __scan_mq_write;
        self->done = __scan_mq_done;

        self->deinit = __scan_mq_deinit;
        self->destroy = __scan_mq_destroy;

        return 0;
err_ret:
        return ret;
}

static int __mq_push(recovery_mq_t *self, recovery_mq_item_t *item)
{
        int ret;

        ret = sy_rwlock_wrlock(&self->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        item->dump(item, "push");

        count_list_add_tail(&item->hook, &self->file_list);
        // self->total += item->record_count;
        self->recovery += item->record_count;

        sy_rwlock_unlock(&self->rwlock);
        return 0;
err_ret:
        return ret;
}

static int __mq_pop(recovery_mq_t *self, recovery_mq_item_t **item)
{
        struct list_head *pos;

        *item = NULL;

        if (self->file_list.count > 0) {
                int count = self->file_list.count;

                pos = self->file_list.list.next;
                count_list_del_init(pos, &self->file_list);

                // !!!NOTE
                *item = list_entry(pos, recovery_mq_item_t, hook);

                YASSERT(count == self->file_list.count + 1);
        }

        return 0;
}

static int __mq_pop2(recovery_mq_t *self, int len, vec_rec_t *v)
{
        int ret;
        recovery_mq_item_t *item;

        ret = sy_rwlock_wrlock(&self->rwlock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        int count = 0;
        rec_t *rec = NULL;
        while (count < len) {
                if (self->mq_item == NULL) {
                        // pop and load
                        ret = self->pop(self, &item);
                        if (unlikely(ret))
                                GOTO(err_lock, ret);

                        if (item == NULL) {
                                if (v->length > 0) {
                                        goto out;
                                } else {
                                        ret = ENOENT;
                                        goto err_lock;
                                }
                        }

                        YASSERT(item->fd != -1);
                        YASSERT(item->record_count > 0);
                        YASSERT(item->cursor == 0);
                        YASSERT(item->addr == NULL);

                        // load
                        item->addr = mmap(NULL, item->record_count * sizeof(rec_t),
                                          PROT_READ, MAP_PRIVATE, item->fd, 0);
                        if (item->addr == MAP_FAILED) {
                                ret = errno;
                                UNIMPLEMENTED(__DUMP__);
                        }

                        self->mq_item = item;
                }

                YASSERT(self->mq_item != NULL);

                item = self->mq_item;

                DBUG("count %d/%d item %p cursor %d/%d\n",
                      count, len, item, item->cursor, item->record_count);

                if (item->cursor < item->record_count) {
                        rec = item->addr + (item->cursor++) * sizeof(rec_t);

                        vec_push(v, *rec);
                        count++;

                        DBUG("count %d len %d rec "CHKID_FORMAT"\n",
                              count, v->length, CHKID_ARG(&rec->chkid));
                } else {
                        item->destroy(item);
                        self->mq_item = NULL;
                }
        }

out:
        sy_rwlock_unlock(&self->rwlock);
        return 0;

err_lock:
        sy_rwlock_unlock(&self->rwlock);
err_ret:
        return ret;
}

static int __mq_get_length(recovery_mq_t *self)
{
        return self->file_list.count;
}

static int __mq_deinit(recovery_mq_t *self)
{
        int ret;
        // @todo ONLY release scan/recovery resources

        // sy_rwlock_destroy(&self->rwlock);

        DINFO("count %d\n", self->file_list.count);

        ret = sy_rwlock_wrlock(&self->rwlock);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        recovery_mq_item_t *item;
        struct list_head *pos, *n;
        list_for_each_safe(pos, n, &self->file_list.list) {
                // NODE!!! MUST first del it from list, then process item
                // if first destroy item, UNDEFINED behaviors
                count_list_del_init(pos, &self->file_list);

                item = list_entry(pos, recovery_mq_item_t, hook);
                item->destroy(item);
        }

        if (self->mq_item) {
                item = self->mq_item;
                item->destroy(item);
                self->mq_item = NULL;
        }

        // self->total = 0;
        self->recovery = 0;

        YASSERT(self->file_list.count == 0);
        YASSERT(self->mq_item == NULL);

        sy_rwlock_unlock(&self->rwlock);
        return 0;
}

int recovery_mq_init(recovery_mq_t *self)
{
        int ret;

        ret = sy_rwlock_init(&self->rwlock, "recovery_mq.rwlock");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        count_list_init(&self->file_list);
        self->mq_item = NULL;

        // self->total = 0;
        self->recovery = 0;

        self->push = __mq_push;
        self->pop = __mq_pop;
        self->pop2 = __mq_pop2;

        self->get_length = __mq_get_length;

        self->deinit = __mq_deinit;

        return 0;
err_ret:
        return ret;
}

static int __mq_item_write(recovery_mq_item_t *self, void *buf, int len)
{
        int ret = 0;
        recovery_pool_t *pool = self->pool;

        if (self->fd == -1) {
                sprintf(self->filename, "/dev/shm/lich4/recovery/%s-%s-XXXXXX",
                        pool->name, id2str(&self->chkid));
                self->fd = _mkstemp(self->filename);
        }

        YASSERT(self->fd != -1);

        ret = write(self->fd, buf, len);
        if (ret == -1) {
                UNIMPLEMENTED(__DUMP__);
                ret = errno;
                GOTO(err_ret, ret);
        }

        self->record_count++;

        if (self->record_count >= self->record_max) {
                ret = self->flush(self);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __mq_item_copy(recovery_mq_item_t *self, recovery_mq_item_t **new)
{
        int ret;
        recovery_mq_item_t *mq_item;

        *new = NULL;

        ret = ymalloc((void **)&mq_item, sizeof(recovery_mq_item_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // copy
        *mq_item = *self;

        INIT_LIST_HEAD(&mq_item->hook);

        *new = mq_item;
        return 0;
err_ret:
        return ret;
}

static int __mq_item_reset(recovery_mq_item_t *self)
{
        self->fd = -1;
        self->addr = NULL;
        self->cursor = 0;

        self->record_count = 0;

        return 0;
}

static int __mq_item_flush(recovery_mq_item_t *self)
{
        int ret;
        recovery_pool_t *pool = self->pool;
        recovery_mq_t *rmq = &pool->rmq;
        recovery_mq_item_t *new;

        if (self->record_count > 0) {
                YASSERT(self->fd >= 0);

                ret = self->copy(self, &new);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                self->dump(self, "old");
                new->dump(new, "new");

                ret = rmq->push(rmq, new);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                self->reset(self);
        }

        return 0;
err_ret:
        yfree((void **)&new);
        return ret;
}

static void __mq_item_dump(recovery_mq_item_t *self, const char *name)
{
        recovery_pool_t *pool = self->pool;

        DINFO("name %s/%s chunk "CHKID_FORMAT" item %p fd %d (%s) count %d/%d/%d addr %p\n",
              pool->name,
              name,
              CHKID_ARG(&self->chkid),
              self,
              self->fd,
              self->filename,
              self->record_max,
              self->record_count,
              self->cursor,
              self->addr);
}

static int __mq_item_destroy(recovery_mq_item_t *self)
{
        if (self->fd != -1) {
                close(self->fd);
                self->fd = -1;
        }

        if (self->addr) {
                munmap(self->addr, self->record_count * sizeof(rec_t));
                self->addr = NULL;
        }

        yfree((void **)&self);
        return 0;
}

int recovery_mq_item_create(recovery_mq_item_t **item, recovery_pool_t *pool,
                            const chkid_t *chkid, const addtion_t *addtion)
{
        int ret;
        recovery_mq_item_t *mq_item;

        *item = NULL;

        ret = ymalloc((void **)&mq_item, sizeof(recovery_mq_item_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(mq_item, 0x0, sizeof(recovery_mq_item_t));

        mq_item->pool = pool;
        mq_item->chkid = *chkid;
        mq_item->addtion = *addtion;

        mq_item->scan_status = __S_STATUS_INIT__;

        mq_item->fd = -1;
        mq_item->addr = NULL;
        mq_item->cursor = 0;

        mq_item->record_count = 0;
        mq_item->record_max = RECOVERY_RMQ_MAX_RECORD;

        mq_item->write = __mq_item_write;
        mq_item->copy = __mq_item_copy;
        mq_item->reset = __mq_item_reset;
        mq_item->flush = __mq_item_flush;

        mq_item->dump = __mq_item_dump;
        mq_item->destroy = __mq_item_destroy;

        *item = mq_item;
        return 0;
err_ret:
        return ret;
}
